package cn.galudisu.spark._1_dataframes

/**
  * Spark Dataframe，从CSV中创建Dataframe，以及进行SQL-Like操作
  *
  * @author galudisu
  */

import cn.galudisu.spark.Environment
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

import scala.collection.JavaConverters._

object DataFrameCSV extends App with Environment{

  //The SparkConf holds all the information for running this Spark 'cluster'
  //For this recipe, we are running locally. And we intend to use just 2 cores in the machine - local[2]
  val conf = new SparkConf().setAppName("csvDataFrame").setMaster("local[2]")

  //The easiest way to query data in Spark is to use SQL queries. In fact, that's the recommended way
  val sqlContext = SparkSession.builder().config(conf = conf).getOrCreate().sqlContext

  //Now, lets load our pipe-separated file
  //students is of type org.apache.spark.sql.DataFrame
  val students = sqlContext.read
                 .format("com.databricks.spark.csv")
                 .option("header", "true")
                 .option("delimiter", "|")
                 .load("csv/StudentData.csv")

  //Print the schema of this input
  students.printSchema

  //Sample n records along with headers
  students.show(3)

  //Sample 20 records along with headers
  students.show()

  //Sample the first 5 records
  students.head(5).foreach(println)

  //Alias of head
  students.take(5).foreach(println)

  //Select just the email id to a different dataframe
  val emailDataFrame: DataFrame = students.select("email")

  emailDataFrame.show(3)

  //Select more than one column and create a different dataframe
  val studentEmailDF = students.select("studentName", "email")

  studentEmailDF.show(3)

  // 条件过滤

  // 输出id大于5的7条记录
  students.filter("id > 5").show(7)

  // StudentName为空的记录
  students.filter("studentName =''").show(7)

  println("student null")

  // 多个条件
  students.filter("studentName ='' OR studentName='NULL'").show(7)

  //Get all students whose name starts with the letter 'M' 基于SQL-Like的查询
  students.filter("SUBSTR(studentName,0,1) ='M'").show(7)


  // 关联查询
  //The real power of DataFrames lies in the way we could treat it like a relational table and use SQL to query
  //Step 1. Register the students dataframe as a table with name "students" (or any name)
  students.createOrReplaceTempView("students")

  //Step 2. Query it away
  val dfFilteredBySQL = sqlContext.sql("select * from students where studentName!='' order by email desc")

  dfFilteredBySQL.show(7)

  val students1 = sqlContext.read
                 .format("com.databricks.spark.csv")
                 .option("header", "true")
                 .option("delimiter", "|")
                 .load("csv/StudentPrep1.csv")
  val students2 = sqlContext.read
                 .format("com.databricks.spark.csv")
                 .option("header", "true")
                 .option("delimiter", "|")
                 .load("csv/StudentPrep2.csv")

  // Inner join
  val studentsJoin = students1.join(students2, students1("id") === students2("id"))
  studentsJoin.show(studentsJoin.count.toInt)

  // Right outer join
  val studentsRightOuterJoin = students1.join(students2, students1("id") === students2("id"), "right_outer")
  studentsRightOuterJoin.show(studentsRightOuterJoin.count.toInt)

  // Left outer join
  val studentsLeftOuterJoin = students1.join(students2, students1("id") === students2("id"), "left_outer")
  studentsLeftOuterJoin.show(studentsLeftOuterJoin.count.toInt)

  // 排序

  //You could also optionally order the dataframe by column without registering it as a table.
  //Order by descending order  降序
  students.sort(students("studentName").desc).show(10)

  //Order by a list of column names - without using SQL 多列排序
  students.sort("studentName", "id").show(10)

  println("Students and Id descending")
  students.sort(students("studentName").desc, students("id").asc).show(10)

  //Now, let's save the modified dataframe with a new name
  val options = Map("header" -> "true", "path" -> "csv/ModifiedStudent.csv")

  // 重命名 columns，SQL-Like
  //Modify dataframe - pick studentname and email columns, change 'studentName' column name to just 'name'
  val copyOfStudents = students.select(students("studentName").as("name"), students("email"))

  // 保存为文件
  copyOfStudents.show()
  //Save this new dataframe with headers and with file name "ModifiedStudent.csv"
  copyOfStudents.write.format("com.databricks.spark.csv").options(options).mode(SaveMode.Overwrite).save

  //Load the saved data and verify the schema and list some records
  //Instead of using the csvFile, you could do a 'load'
  val newStudents = sqlContext.read.format("com.databricks.spark.csv").options(options).load
  newStudents.printSchema()
  println("new Students")
  newStudents.show()

}